package com.flink.utils;

import com.flink.dto.OverSpeedCarInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class JdbcSink extends RichSinkFunction<OverSpeedCarInfo> {
    Connection connection;
    PreparedStatement pst;

    @Override
    public void open(Configuration parameters) throws Exception {
        connection = DriverManager.getConnection("jdbc:mysql://101.35.91.29:3399/flink-card", "root", "123456");
        pst = connection.prepareStatement("INSERT INTO `flink-card`.`t_over_speed_car_info` ( `area_id`, `road_id`, `monitor_id`, `card_no`, `speed`, `limit_speed`, `create_time`) VALUES (?,?,?,?,?,?,?)");
        super.open(parameters);
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        pst.close();
        connection.close();
    }

    @Override
    public void invoke(OverSpeedCarInfo value, Context context) throws Exception {
        pst.setString(1, value.getAreaId());
        pst.setString(2, value.getRoadId());
        pst.setString(3, value.getMonitorId());
        pst.setString(4, value.getCardNo());
        pst.setDouble(5, value.getSpeed());
        pst.setDouble(6, value.getLimitSpeed());
        pst.setDate(7, new Date(value.getCreateTime().getTime()));
        pst.executeUpdate();
        super.invoke(value, context);
    }
}
